package com.foreveross.bsl.push.application.impl;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;

import com.foreveross.bsl.common.utils.bean.Identities;
import com.foreveross.bsl.common.utils.mapper.JsonMapper;
import com.foreveross.bsl.push.application.CheckinMgmtService;
import com.foreveross.bsl.push.application.PushModuleException;
import com.foreveross.bsl.push.application.impl.channel.ChannelSelector;
import com.foreveross.bsl.push.application.impl.channel.PushChannelable;
import com.foreveross.bsl.push.application.impl.channel.SendBag;
import com.foreveross.bsl.push.application.impl.channel.SendBagBuilder;
import com.foreveross.bsl.push.application.impl.channel.SendTarget;
import com.foreveross.bsl.push.application.impl.channel.SendTarget.TargetIdentity;
import com.foreveross.bsl.push.application.vo.DeviceCheckinVo;
import com.foreveross.bsl.push.application.vo.MessageParams;
import com.foreveross.bsl.push.application.vo.MessageVo;
import com.foreveross.bsl.push.domain.Message;
import com.foreveross.bsl.push.domain.Message.MessageTypeEnum;
import com.foreveross.bsl.push.domain.ReceiverTypeEnum;
import com.foreveross.bsl.push.domain.entity.Push;
import com.foreveross.bsl.push.domain.entity.Push.PushStatusEnum;
import com.foreveross.bsl.push.domain.entity.PushRequest;
import com.foreveross.bsl.push.domain.entity.PushTarget;
import com.foreveross.bsl.push.repository.PushRepository;
import com.foreveross.bsl.push.repository.PushRequestRepository;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

/**
 * 推送分发器
 *
 * @author Wangyi
 * @version v1.0
 *
 * @date 2013-7-16
 *
 */
@Service
public class DefaultPushDispatcher implements PushDispatcher {

	private static final Logger log = LoggerFactory.getLogger(DefaultPushDispatcher.class);

	@Autowired
	private ChannelSelector channelSelector;

	@Autowired
	private CheckinMgmtService deviceCheckinService;

	@Autowired
	private SendBagBuilder sendBagBuilder;

	private JsonMapper jsonMapper = JsonMapper.nonEmptyMapper();

	private final PushRepository getPushRepository() {
		return new Push().getEntityRepository(PushRepository.class);
	}

	private final PushRequestRepository getPushRequestRepository() {
		return new PushRequest().getEntityRepository(PushRequestRepository.class);
	}

	@Override
	public PushRequest dispatch(MessageParams msgParams) {
		// TODO 须实现异步,高性能的消息分发
		PushRequest pr = this.createPushRequest(msgParams);
		List<Push> pushs = null;
		Page<DeviceCheckinVo> page = null;
		int i = 1, totalValidPush=0;
		TimeUnit timeUnit=TimeUnit.MILLISECONDS;
		Stopwatch stopWatch=new Stopwatch();
		do {
			// 按页获取数据
			stopWatch.reset().start();
			page = deviceCheckinService.findTargets(i, 1000, msgParams);
			log.trace("findTarget() cost {}ms", stopWatch.elapsed(timeUnit));

			stopWatch.reset().start();
			pushs = this.createPushs(page.getContent(), pr);
			log.trace("createPushs() {} pushs cost {}ms", pushs.size(), stopWatch.elapsed(timeUnit));

			if(pushs.size() > 0) {
				totalValidPush += pushs.size();
				stopWatch.reset().start();
				// 构建发送袋
				List<SendBag> sendBags = this.sendBagBuilder.build(pr, pushs);
				log.trace("sendBagBuilder.build() cost {}ms", stopWatch.elapsed(timeUnit));

				stopWatch.reset().start();
				for (SendBag sb : sendBags) {
					asyncSend(sb);
				}
				log.trace("asyncSend() {} times cost {}ms", sendBags.size(), stopWatch.elapsed(timeUnit));
			}
			i++;
		} while (i <= page.getTotalPages());
		pr.setAffectedDevices(totalValidPush);

		stopWatch.reset().start();
		getPushRequestRepository().save(pr);
		log.trace("pushRequestRepository.save() cost {}ms", stopWatch.elapsed(timeUnit));
		stopWatch.stop();

		if(log.isTraceEnabled()){
			log.trace("原消息JSON: {}", jsonMapper.toJson(pr.getMessage()));
		}
		log.debug("有效推送数：{}", totalValidPush);
		return pr;
	}

	private final AtomicInteger msgSendCount = new AtomicInteger(); // 原子量号

	private void asyncSend(final SendBag sendBag) {
		// TODO 异步发送的临时解决方案，待优化。
		final String dsRouter = DsRouterHelper.getCurrentDsRouter();
		final Runnable sendTask = new Runnable() {
			@Override
			public void run() {
				DsRouterHelper.setCurrentDsRouter(dsRouter);
				sendBag.execute();
			}
		};
		Thread sendThread = new Thread(sendTask, sendBag.getChannel().getChannelId() + "@" + msgSendCount.incrementAndGet());
		sendThread.start();
	}

	private PushRequest createPushRequest(MessageParams msgParams) {
		final String id=Identities.uuid2();
		Validate.notBlank(id);
		PushRequest pr = new PushRequest();
		pr.setId(id);
		pr.setAppId(msgParams.getAppId());
		MessageVo msgVo = msgParams.getMessage();
		Message msg = new Message();
		msg.setId(id);
		msg.setContent(msgVo.getContent());
		// XXX 当MessageVo的extras里有map时，dozer的复制会出问题！ 待深入研究
		msg.setDirectExtrasPropertys(msgVo.getDirectExtrasPropertys());
		msg.setExtras(Maps.newHashMap(msgVo.getExtras()));
		msg.setMessageType(MessageTypeEnum.valueOf(msgVo.getMessageType().name()));
		msg.setTitle(msgVo.getTitle());
		pr.setMessage(msg);
		pr.setAppId(msgParams.getAppId());
		pr.setReceiverType(ReceiverTypeEnum.valueOf(msgParams.getReceiverType().name()));
		pr.setReceiverValue(msgParams.getReceiverValue());
		pr.setTags(msgParams.getTags());
		pr.setTimeToLive(msgParams.getTimeToLive());
		pr.setSubmitTime(new Date());
		getPushRequestRepository().save(pr);
		return pr;
	}

	private List<Push> createPushs(List<DeviceCheckinVo> devices, PushRequest pr) {
		List<Push> pushs = new ArrayList<Push>();
		for (DeviceCheckinVo dck : devices) {
			if(StringUtils.isEmpty(dck.getPushToken())){
				log.debug("跳过失效token的设备签到: deviceId:{},appId:{}", dck.getDeviceId(), dck.getAppId());
				continue;
			}
			Push push = new Push();
			push.setCreateTime(new Date());
			push.setRequest(pr);
			push.setTimeToLive(pr.getTimeToLive());
			if (push.getTimeToLive() >= 0) {
				push.setExpiredTime(new Date(push.getCreateTime().getTime() + push.getTimeToLive() * 1000));
			} else {
				push.setExpiredTime(push.getCreateTime());
			}
			push.setStatus(PushStatusEnum.PENDING);
			push.setTarget(new PushTarget(dck.getDeviceId(), dck.getAppId(), dck.getPushToken(), dck.getAlias()));
			push.setChannelId(dck.getChannelId());
			pushs.add(push);
		}
		// 保存推送列表
		this.getPushRepository().save(pushs);
		return pushs;
	}

	@Override
	public void dispatch(Push push) {
		if (push.getRequest() == null) {
			throw new PushModuleException("push对应的pushRequest对象不存在");
		}
		push.setRetryCount(push.getRetryCount() + 1);
		this.getPushRepository().save(push);
		PushTarget pt = push.getTarget();
		PushChannelable pc = channelSelector.selectChannel(push.getChannelId());
		SendTarget st = new SendTarget();
		st.setAppId(pt.getAppId());
		st.getTargets().add(new TargetIdentity(pt.getToken(), pt.getDeviceId(), push.getId()));
		pc.send(Sets.newHashSet(st), push.getRequest().getMessage());
	}

}
